/*
 * Lucas Fialho Zawacki
 * Paulo Renato Lanzarin
 * (C) Copyright 2017 Bigbluebutton
 *
 */

'use strict'

const C = require('../bbb/messages/Constants');
const MediaHandler = require('../media-handler');
const Messaging = require('../bbb/messages/Messaging');
const Logger = require('../utils/Logger');
const BaseProvider = require('../base/BaseProvider');
const config = require('config');
const localIpAddress = config.get('localIpAddress');
const EventEmitter = require('events').EventEmitter;
const SHOULD_RECORD = config.get('recordScreenSharing');
const KEYFRAME_INTERVAL = config.get('screenshareKeyframeInterval');
const ENABLE_SCREENSHARE_FLASH_BRIDGE = config.has('screenshareEnableFlashRTPBridge')
  ? config.get('screenshareEnableFlashRTPBridge')
  : false;

const LOG_PREFIX = "[screenshare]";

// Global MCS endpoints mapping. These hashes maps IDs generated by the mcs-core
// lib to the ones generate in the ScreenshareManager
let sources = {};
let rtpEndpoints = {};

module.exports = class Screenshare extends BaseProvider {
  constructor(id, bbbGW, voiceBridge, userId, vh, vw, meetingId, mcs) {
    super(bbbGW);
    this.sfuApp = C.SCREENSHARE_APP;
    this.mcs = mcs;
    this.mcsUserId;
    this.userId = userId;
    this._connectionId = id;
    this._presenterEndpoint = null;
    this._ffmpegEndpoint = null;
    this._voiceBridge = voiceBridge;
    this._meetingId = meetingId;
    this._streamUrl = "";
    this._vw = vw;
    this._vh = vh;
    this._presenterCandidatesQueue = [];
    this._viewerEndpoints = [];
    this._viewersCandidatesQueue = [];
    this._status = C.MEDIA_STOPPED;
    this._rtmpBroadcastStarted = false;
    this.recording = {};
    this.isRecorded = false;
    this._recordingSubPath = 'screenshare';
  }

  async onIceCandidate (candidate, role, userId, connectionId) {
    Logger.debug(LOG_PREFIX, "onIceCandidate", role, userId, candidate);
    switch (role) {
      case C.SEND_ROLE:
        if (this._presenterEndpoint) {
          try {
            this.flushCandidatesQueue(this.mcs, [...this._presenterCandidatesQueue], this._presenterEndpoint);
            this._presenterCandidatesQueue = [];

            await this.mcs.addIceCandidate(this._presenterEndpoint, candidate);
          } catch (err) {
            this._handleError(LOG_PREFIX, err, role, userId);
          }
        } else {
          Logger.debug(LOG_PREFIX, "Pushing ICE candidate to presenter queue");
          this._presenterCandidatesQueue.push(candidate);
        }
      case C.RECV_ROLE:
        let endpoint = this._viewerEndpoints[connectionId];
        if (endpoint) {
          try {
            this.flushCandidatesQueue(this.mcs, [...this._viewersCandidatesQueue[connectionId]], endpoint);
            this._viewersCandidatesQueue[connectionId] = [];

            await this.mcs.addIceCandidate(endpoint, candidate);
          } catch (err) {
            this._handleError(LOG_PREFIX, err, role, userId);
          }
        } else {
          this._viewersCandidatesQueue[connectionId] = [];
          Logger.debug(LOG_PREFIX, "Pushing ICE candidate to viewer queue", userId);
          this._viewersCandidatesQueue[connectionId].push(candidate);
        }
        break;
      default:
        Logger.warn(LOG_PREFIX, "Unknown role", role);
      }
  }

  _mediaStateRTP (event, endpoint) {
    const { mediaId, state } = event;
    const { name, details = null } = state;

    if (mediaId !== endpoint) {
      return;
    }

    switch (name) {
      case "MediaStateChanged":
        break;

      case "MediaFlowOutStateChange":
        Logger.info(LOG_PREFIX, "Session with media", mediaId, "received state", state);
        break;

      case "MediaFlowInStateChange":
        Logger.info(LOG_PREFIX, "Session with media", mediaId, "received state", state);
        if (details === 'FLOWING') {
          this._onRtpMediaFlowing();
        }
        else {
          this._onRtpMediaNotFlowing();
        }
        break;

      default: Logger.warn(LOG_PREFIX, "Unrecognized event", event);
    }
  }

  _onMCSIceCandidate (event, connectionId, endpoint) {
    const { mediaId, candidate } = event;

    if (mediaId !== endpoint) {
      return;
    }

    Logger.debug(LOG_PREFIX, 'Received ICE candidate from mcs-core for media session', mediaId, '=>', candidate, "for connection", connectionId);

    this.bbbGW.publish(JSON.stringify({
      connectionId,
      type: C.SCREENSHARE_APP,
      id : 'iceCandidate',
      cameraId: this._connectionId,
      candidate : candidate
    }), C.FROM_SCREENSHARE);

  }

  _mediaStateWebRTC (event, endpoint) {
    const { mediaId , state } = event;
    const { name, details } = state;

    if (mediaId !== endpoint) {
      return;
    }

    switch (name) {
      case "MediaStateChanged":
        break;

      case "MediaFlowOutStateChange":
      case "MediaFlowInStateChange":
        Logger.info(LOG_PREFIX, "WebRTC session with media", mediaId, "received state", state);
        if (!ENABLE_SCREENSHARE_FLASH_BRIDGE) {
          if (details === 'FLOWING') {
            this._onWebRTCMediaFlowing();
          }
          else {
            this._onWebRTCMediaNotFlowing();
          }
        }
        break;

      case C.MEDIA_SERVER_OFFLINE:
        Logger.error(LOG_PREFIX, "Screenshare provider received MEDIA_SERVER_OFFLINE event", event);
        this.emit(C.MEDIA_SERVER_OFFLINE, event);
        break;


      default: Logger.warn(LOG_PREFIX, "Unrecognized event", event);
    }
  }

  _recordingState (event, endpoint) {
    const { mediaId , state } = event;
    const { name, details } = state;

    if (mediaId !== endpoint) {
      return;
    }

    Logger.info(LOG_PREFIX, 'Recording status for rec', mediaId, 'for screenshare at', this._meetingId, "is", name, state, details);
  }

  async startRecording() {
    return new Promise(async (resolve, reject) => {
      try {
        const recordingPath = this.getRecordingPath(this._meetingId, this._recordingSubPath, this._voiceBridge);
        const recordingId = await this.mcs.startRecording(this.mcsUserId, this._presenterEndpoint, recordingPath);
        this.recording = { recordingId, filename: recordingPath };
        this.mcs.onEvent(C.MEDIA_STATE, this.recording.recordingId, (event) => {
          this._recordingState(event, this.recording.recordingId);
        });
        this.sendStartShareEvent();
        resolve(this.recording);
      } catch (err) {
        reject(this._handleError(LOG_PREFIX, err));
      }
    });
  }

  sendStartShareEvent () {
    let shareEvent = Messaging.generateWebRTCShareEvent('StartWebRTCDesktopShareEvent', this._meetingId, this.recording.filename);
    this.bbbGW.writeMeetingKey(this._meetingId, shareEvent, function(error) {
      Logger.warn(LOG_PREFIX, 'Error writing START share event error', error);
    });
  }

  sendStopShareEvent () {
    let shareEvent = Messaging.generateWebRTCShareEvent('StopWebRTCDesktopShareEvent', this._meetingId , this.recording.filename);
    this.bbbGW.writeMeetingKey(this._meetingId, shareEvent, function(error) {
      Logger.warn(LOG_PREFIX, 'Error writing STOP share event error', error);
    });
  }

  start (sessionId, connectionId, sdpOffer, userId, role) {
    return new Promise(async (resolve, reject) => {
      this._status = C.MEDIA_STARTING;
      // Probe akka-apps to see if this is to be recorded
      if (SHOULD_RECORD && role === C.SEND_ROLE) {
        this.isRecorded = await this.probeForRecordingStatus(this._meetingId, userId);
      }

      Logger.info(LOG_PREFIX, "Starting session", this._voiceBridge + '-' + role);
      if (!this.mcsUserId) {
        try {
          this.mcsUserId = await this.mcs.join(this._voiceBridge, 'SFU', {});
          Logger.info(LOG_PREFIX, "MCS Join for", this._connectionId, "returned", this.mcsUserId);

        }
        catch (error) {
          Logger.error(LOG_PREFIX, "MCS Join returned error =>", error);
          return reject(this._handleError(LOG_PREFIX, error, role, userId));
        }
      }

      if (role === C.RECV_ROLE) {
        try {
          const sdpAnswer = await this._startViewer(connectionId, this._voiceBridge, sdpOffer, userId, this._presenterEndpoint)
          return resolve(sdpAnswer);
        }
        catch (err) {
          return reject(this._handleError(LOG_PREFIX, err, role, userId));
        }
      }

      if (role === C.SEND_ROLE) {
        try {
          const sdpAnswer = await this._startPresenter(sdpOffer);
          return resolve(sdpAnswer);
        }
        catch (err) {
          return reject(this._handleError(LOG_PREFIX, err, role, userId));
        }
      }
    });
  }

  _startPresenter (sdpOffer) {
    return new Promise(async (resolve, reject) => {
      try {
        const presenterSdpAnswer = await this._publishWebRTCStream(sdpOffer);
        await this.mcs.setContentFloor(this._voiceBridge, this._presenterEndpoint);

        resolve(presenterSdpAnswer);
      }
      catch (err) {
        Logger.error(LOG_PREFIX, "MCS publish returned error =>", err);
        reject(this._handleError(LOG_PREFIX, err));
      }

      if (ENABLE_SCREENSHARE_FLASH_BRIDGE) {
        try {
          await this._upstartRTPStream();
        } catch (e) {
          Logger.error(`${LOG_PREFIX} Screenshare RTP-RTMP bridge could not be started at ${this._voiceBridge}`, e)
        }
      }
    });
  }

  async _publishWebRTCStream (descriptor) {
    try {
      const options = {
        descriptor,
        name: this._assembleStreamName('publish', this.userId, this._voiceBridge),
        mediaProfile: 'content',
      };

      const { mediaId, answer } = await this.mcs.publish(this.mcsUserId, this._voiceBridge, C.WEBRTC, options);
      this._presenterEndpoint = mediaId;

      this.mcs.onEvent(C.MEDIA_STATE, this._presenterEndpoint, (event) => {
        this._mediaStateWebRTC(event, this._presenterEndpoint)
      });

      this.mcs.onEvent(C.MEDIA_STATE_ICE, this._presenterEndpoint, (event) => {
        this._onMCSIceCandidate(event, this._connectionId, this._presenterEndpoint);
      });

      sources[this._voiceBridge] = this._presenterEndpoint;
      const presenterSdpAnswer = answer;
      this.flushCandidatesQueue(this.mcs, [...this._presenterCandidatesQueue], this._presenterEndpoint);
      this._presenterCandidatesQueue = [];

      Logger.info(LOG_PREFIX, "MCS publish for user", this.mcsUserId, "returned", this._presenterEndpoint);
      return presenterSdpAnswer;
    }
    catch (err) {
      throw err;
    }
  }

  async _fetchContentFloor () {
    try {
      const { floor } = await this.mcs.getContentFloor(this._voiceBridge);
      const contentFloorId = floor.mediaId;
      Logger.debug(LOG_PREFIX, "Content floor for", this._voiceBridge, "is", contentFloorId);
      return contentFloorId;
    } catch (e) {
      throw e;
    }
  }

  async _upstartRTPStream () {
    try {
      const sendVideoPort = MediaHandler.getVideoPort();
      const rtpSdpOffer = MediaHandler.generateVideoSdp(localIpAddress, sendVideoPort);
      const options = {
        descriptor: rtpSdpOffer,
        keyframeInterval: KEYFRAME_INTERVAL,
        mediaProfile: 'content'
      }

      if (this._presenterEndpoint == null) {
        this._presenterEndpoint = await this._fetchContentFloor();;
      }

      const { mediaId, answer } = await this.mcs.subscribe(this.mcsUserId,
        this._presenterEndpoint, C.RTP, options);

      this._ffmpegEndpoint = mediaId;
      rtpEndpoints[this._voiceBridge] = this._ffmpegEndpoint;

      const recvVideoPort = answer.match(/m=video\s(\d*)/)[1];
      const ipRegex = /(IP4\s)([0-9.]*)/g;
      const mediaServerIp = ipRegex.exec(answer)[2];
      this._rtpParams = MediaHandler.generateTranscoderParams(mediaServerIp, localIpAddress,
          sendVideoPort, recvVideoPort, this._meetingId, "stream_type_video", C.RTP_TO_RTMP, "copy", this.userId, this._voiceBridge);

      this.mcs.onEvent(C.MEDIA_STATE, this._ffmpegEndpoint, (event) => {
        this._mediaStateRTP(event, this._ffmpegEndpoint);
      });

      Logger.info(LOG_PREFIX, "MCS subscribe for user", this.mcsUserId, "returned", this._ffmpegEndpoint);
    } catch (err) {
      throw err;
    }
  }

  _startViewer(connectionId, voiceBridge, sdpOffer, userId, presenterEndpoint) {
    return new Promise(async (resolve, reject) => {
      Logger.info(LOG_PREFIX, "Starting viewer", userId, "for voiceBridge", this._voiceBridge);
      let sdpAnswer;

      this._viewersCandidatesQueue[connectionId] = [];

      try {
        const options = {
          descriptor: sdpOffer,
          name: this._assembleStreamName('subscribe', userId, this._voiceBridge),
          mediaProfile: 'content',
        }

        if (this._presenterEndpoint == null) {
          this._presenterEndpoint = await this._fetchContentFloor();
        }

        const { mediaId, answer } = await this.mcs.subscribe(this.mcsUserId,
          this._presenterEndpoint, C.WEBRTC, options);

        this._viewerEndpoints[connectionId] = mediaId;
        sdpAnswer = answer;

        this.flushCandidatesQueue(this.mcs, [...this._viewersCandidatesQueue[connectionId]], this._viewerEndpoints[connectionId]);
        this._viewersCandidatesQueue[connectionId] = [];

        this.mcs.onEvent(C.MEDIA_STATE, mediaId, (event) => {
          this._mediaStateWebRTC(event, mediaId)
        });

        this.mcs.onEvent(C.MEDIA_STATE_ICE, mediaId, (event) => {
          this._onMCSIceCandidate(event, connectionId, mediaId);
        });

        Logger.info(LOG_PREFIX, "MCS subscribe returned for user", this.mcsUserId, "returned", this._viewerEndpoints[connectionId], "at userId", userId);
        return resolve(sdpAnswer);
      }
      catch (err) {
        Logger.error(LOG_PREFIX, "MCS veiwer subscribe returned error =>", err);
        return reject(this._handleError(LOG_PREFIX, err));
      }
    });
  }

  async stopRecording() {
    this.sendStopShareEvent();

    try {
      await this.mcs.stopRecording(this.mcsUserId, this.recording.recordingId);
    } catch (e) {
      // Logging it in case it still happens, but recording should be stopped
      // if it errors out inside mcs-core or if we call mcs.leave for this user
      // so it'll always be stopped. If anything pops here, probably related
      // to it already being stopped or it wasn't started in the first place
      Logger.warn(`${LOG_PREFIX} Failed to stop recording at ${this._voiceBridge}`, e);
    }
  }

  stop () {
    return new Promise(async (resolve, reject) => {
      try {
        if (this._status === C.MEDIA_STOPPED) {
          return resolve();
        }
        Logger.info('[screnshare] Stopping and releasing endpoints for MCS user', this.mcsUserId);
        await this._stopScreensharing();
        // Check if properly started the recording before trying to stop it
        if (this.isRecorded && this.recording && this.recording.recordingId) {
          this.stopRecording();
        }

        this._status = C.MEDIA_STOPPED;

        this.mcs.releaseContentFloor(this._voiceBridge, this._presenterEndpoint);
        await this.mcs.leave(this._voiceBridge, this.mcsUserId);
        Logger.info(LOG_PREFIX, "Left MCS room", this._voiceBridge);
        delete sources[this._presenterEndpoint];
        this._candidatesQueue = null;
        this._presenterEndpoint = null;
        this._ffmpegEndpoint = null;
        return resolve();
      }
      catch (err) {
        this._handleError(LOG_PREFIX, err);
        Logger.error(LOG_PREFIX, 'MCS returned an error when trying to leave =>', err);
        return resolve();
      }
    });
  }

  _stopScreensharing() {
    return new Promise(async (resolve, reject) => {
      try {
        Logger.info(LOG_PREFIX, "Stopping screensharing with status", this._status);
        const isTranscoderAvailable = await this.bbbGW.isChannelAvailable(C.TO_BBB_TRANSCODE_SYSTEM_CHAN);
        const strm = Messaging.generateStopTranscoderRequestMessage(this._meetingId, this._meetingId);

        if (isTranscoderAvailable) {
          // Interoperability: capturing 1.1 stop_transcoder_reply messages
          this.bbbGW.once(C.STOP_TRANSCODER_REPLY+this._meetingId, async (payload) => {
            const meetingId = payload[C.MEETING_ID];
            await this._stopRtmpBroadcast(meetingId);
            return resolve();
          });

          // Capturing stop transcoder responses from the 2x model
          this.bbbGW.once(C.STOP_TRANSCODER_RESP_2x+this._meetingId, async (payload) => {
            const meetingId = payload[C.MEETING_ID_2x];
            await this._stopRtmpBroadcast(meetingId);
            return resolve();
          });

          this.bbbGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {});
        }

        // Either the transcoder is not available or screensharing couldn't be
        // correctly started
        if (this._status != C.MEDIA_STARTED || !isTranscoderAvailable) {
          this._stopRtmpBroadcast(this._meetingId);
          return resolve();
        }
      }
      catch (err) {
        this._handleError(LOG_PREFIX, err);
        Logger.error(err);
        resolve();
      }
    });
  }

  async _onWebRTCMediaFlowing () {
    if (!this._rtmpBroadcastStarted) {
      Logger.info(LOG_PREFIX, "WebRTC Media FLOWING for meeting", this._voiceBridge);
      // Checking if transcoder is avaiable; if so, transposes the stream to RTMP
      this._startRtmpBroadcast(this._meetingId);

      if (this._status != C.MEDIA_STARTED) {
        Logger.info(LOG_PREFIX, 'webRTC started flowing for meeting', this._voiceBridge);
        if (this.isRecorded) {
          this.startRecording();
        }
        this._status = C.MEDIA_STARTED;
      }
    }
  };

  async _onRtpMediaFlowing() {
    if (!this._rtmpBroadcastStarted) {
      Logger.info(LOG_PREFIX, "RTP Media FLOWING for meeting", this._voiceBridge);
      const isTranscoderAvailable = await this.bbbGW.isChannelAvailable(C.TO_BBB_TRANSCODE_SYSTEM_CHAN);
      const strm = Messaging.generateStartTranscoderRequestMessage(this._meetingId, this._meetingId, this._rtpParams);

      // Checking if transcoder is avaiable; if so, transposes the stream to RTMP
      if (isTranscoderAvailable) {
        // Interoperability: capturing 1.1 start_transcoder_reply messages
        this.bbbGW.once(C.START_TRANSCODER_REPLY+this._meetingId, (payload) => {
          let meetingId = payload[C.MEETING_ID];
          let output = payload["params"].output;
          this._startRtmpBroadcast(meetingId, output);
        });

        // Capturing stop transcoder responses from the 2x model
        this.bbbGW.once(C.START_TRANSCODER_RESP_2x+this._meetingId, (payload) => {
          let meetingId = payload[C.MEETING_ID_2x];
          let output = payload["params"].output;
          this._startRtmpBroadcast(meetingId, output);
        });

        this.bbbGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {});
      } else {
        // transcoder is not available, pure WebRTC environment
        this._startRtmpBroadcast(this._meetingId);
      }

      if (this._status != C.MEDIA_STARTED) {
        Logger.info(LOG_PREFIX, 'webRTC started flowing for meeting', this._voiceBridge);
        if (this.isRecorded) {
          this.startRecording();
        }
        this._status = C.MEDIA_STARTED;
      }
    }
  };

  _stopRtmpBroadcast (meetingId) {
    return new Promise((resolve, reject) => {
      Logger.info(LOG_PREFIX, "_stopRtmpBroadcast for meeting", meetingId);
      let timestamp = Math.floor(new Date());
      let dsrstom = Messaging.generateScreenshareRTMPBroadcastStoppedEvent2x(this._voiceBridge,
        this._voiceBridge, this._streamUrl, this._vw, this._vh, timestamp);
      this.bbbGW.publish(dsrstom, C.FROM_VOICE_CONF_SYSTEM_CHAN);
      resolve();
    });
  }

  _startRtmpBroadcast (meetingId, output) {
    Logger.info(LOG_PREFIX, "_startRtmpBroadcast for meeting", + meetingId);
    const timestamp = Math.floor(new Date());

    // If output is defined, we're using a RTMP url for the flash bridge
    // Else just pass the WebRTC stream name
    if (output) {
      this._streamUrl = MediaHandler.generateStreamUrl(localIpAddress, meetingId, output);
    } else {
      this._streamUrl = this._presenterEndpoint;
    }

    let dsrbstam = Messaging.generateScreenshareRTMPBroadcastStartedEvent2x(this._voiceBridge,
      this._voiceBridge, this._streamUrl, this._vw, this._vh, timestamp);

    this.bbbGW.publish(dsrbstam, C.FROM_VOICE_CONF_SYSTEM_CHAN, function(error) {});
    this._rtmpBroadcastStarted = true;
  }

  _onWebRTCMediaNotFlowing () {
    // FIXME properly implement this when we have a client-side reconnection procedure
    Logger.warn(LOG_PREFIX, "TODO WebRTC source NOT_FLOWING in meeting", this._voiceBridge);
  }

  _onRtpMediaNotFlowing() {
    Logger.warn(LOG_PREFIX, "TODO RTP NOT_FLOWING");
  }

  async stopViewer(id) {
    let viewer = this._viewerEndpoints[id];
    Logger.info(LOG_PREFIX, 'Releasing endpoints for', viewer, 'from', this._presenterEndpoint);

    if (viewer) {
      try {
        await this.mcs.unsubscribe(this.mcsUserId, viewer);
        this._viewersCandidatesQueue[id] = null;
        this._viewerEndpoints[id] = null;
        return;
      }
      catch (err) {
        this._handleError(LOG_PREFIX, err);
        Logger.error(LOG_PREFIX, 'MCS returned error when trying to unsubscribe', err);
        return;
      }
    }
  }
};
